在前面的文章中,我們從生活中的例子理解了同步與非同步的差異,也學習了協程與事件迴圈這兩個非同步程式設計的核心概念。今天,我們要將這些理論知識付諸實踐,深入探討 Python 的 Asyncio 套件——這個讓非同步程式設計變得簡單而強大的工具箱。
還記得上一篇文章中的餐廳點餐比喻嗎?我們說協程像是一張等待叫號的號碼單,而事件迴圈就是那位忙碌的服務生。那麼 Asyncio 是什麼呢?它就是整個餐廳的營運系統——不僅有點餐機制,還包含了廚房調度、外送管理、客戶服務等完整的餐飲服務流程。在 AI 服務的開發中,Asyncio 讓我們能夠優雅地處理大量並行的推理請求、資料庫查詢和網路通信,就像一家高效率的連鎖餐廳,能同時處理數百個訂單而不會讓顧客久等。
asyncio
提供了許多功能,但對於初學者而言,掌握以下幾個關鍵工具就足以應對大部分場景。
asyncio.run(coro)
我們已經見過它了。這是執行非同步程式最簡單、最高階的入口。它會自動創建一個新的事件迴圈,執行你傳入的協程 coro
,直到它完成,最後關閉迴圈。在應用程式的主入口使用它就對了。
asyncio.create_task(coro)
這是安排非同步任務的關鍵。它會告訴事件迴圈:「嘿,這裡有一個協程,請你安排它在背景執行,不用等它,馬上把控制權還給我。」 這個函式會立即回傳一個 Task
物件,你可以把它想像成一個指向背景任務的「句柄」或「指標」。
asyncio.gather(*tasks)
這個工具用於處理多個 Task
。它會接收一個或多個 Task
物件,然後等待它們全部完成。最後,它會將所有任務的回傳值按順序收集起來,並回傳一個列表。我們在 Day 03 的範例中就用過它來並行處理多張圖片。
FastAPI 與 Asyncio 的整合是無縫的,讓我們能夠在 API 端點中充分利用 Asyncio 的所有功能。以下是一些常見的應用場景:
import asyncio
import httpx
from fastapi import FastAPI, HTTPException
app = FastAPI()
@app.get("/search")
async def search_with_timeout(query: str):
"""搜索功能 - 帶超時控制"""
try:
results = await asyncio.wait_for(
perform_external_search(query),
timeout=5.0
)
return {"results": results, "status": "success"}
except asyncio.TimeoutError:
raise HTTPException(status_code=408, detail="搜索超時")
async def perform_external_search(query: str):
"""模擬外部搜索服務 - 同時查詢多個資料來源"""
async with httpx.AsyncClient() as client:
# 並行搜索多個來源,提升回應速度
search_tasks = [
search_source_a(client, query),
search_source_b(client, query),
search_source_c(client, query)
]
results = await asyncio.gather(*search_tasks, return_exceptions=True)
# 過濾掉失敗的結果,返回成功的搜索結果
return [r for r in results if not isinstance(r, Exception)]
這個範例展示了 Asyncio 在實際 API 開發中的兩個重要應用:
超時控制 (Timeout):使用 asyncio.wait_for()
函式來為整個搜索操作設定 5 秒的超時限制。如果搜索操作超過這個時間,會自動拋出 TimeoutError
,我們可以捕獲這個錯誤並回傳適當的 HTTP status code。
錯誤處理與並行容錯:在 perform_external_search
函式中,我們使用 asyncio.gather()
並行執行多個搜索任務,並設定 return_exceptions=True
參數。這樣即使某個搜索來源失敗,其他成功的搜索結果仍然可以正常回傳,大幅提升了系統的穩定性。
在使用 Asyncio 的過程中,開發者常常會遇到一些陷阱。了解這些問題能幫助你寫出更穩定的程式碼:
問題:在非同步函式中使用阻塞操作,導致整個事件迴圈停滯。
解決方案:確保使用對應的非同步版本,如 httpx
而非 requests
,asyncio.sleep()
而非 time.sleep()
。
問題:並行執行時,一個任務的失敗可能導致整個 gather()
操作失敗。
解決方案:使用 return_exceptions=True
參數,或者為每個任務單獨處理例外。
問題:啟動過多的並行任務可能導致記憶體耗盡或系統資源不足。
解決方案:使用 asyncio.Semaphore
來限制同時執行的任務數量。
問題:忘記在協程前加上 await
,導致程式碼看似正常但實際上沒有等待執行完成。
解決方案:養成良好的程式碼檢視習慣,或使用 IDE 的語法檢查功能。
Asyncio 是 Python 非同步程式設計的核心工具,它提供了從基本協程執行到複雜任務管理的完整解決方案。在 FastAPI 開發中,熟練運用 Asyncio 能夠:
掌握 Asyncio 不僅僅是學會幾個函式的用法,更重要的是理解非同步程式設計的思維方式。明天我們將探討 Thread 與 Process,了解在什麼情況下需要使用 multi-thread 或 multi-process 來補充非同步程式設計!